Abstract of the paper
In this research paper, the author has applied various econometric time series and two machine learning models (MLPs, LSTMs) to forecast the daily data on the yield spread i.e. the difference between the 10-year Treasury yields and the 3-month Treasury bills.
First, we decomposed the yield curve into its principal components, then simulated various paths of the yield spread using the Vasicek model. After constructing univariate ARIMA models, and multivariate models such as ARIMAX, VAR and Long Short Term Memory (LSTM), I calibrated the root mean squared error (RMSE) to measure how far the models’ results deviate from the current values. Through impulse response functions, we measure the impact of various shocks on the difference yield spread.
The results indicate that the parsimonious univariate ARIMA model (RMSE = 0.05185) outperforms the richly parameterized VAR method (RMSE = 0.4648) , and the complex LSTM with multivariate data performs equally well as the simple ARIMA model.
!pip install arch
!pip install fredapi
!pip install quandl
import warnings
warnings.filterwarnings('ignore')
import math
from math import sqrt
import datetime
from datetime import date, timedelta
import numpy as np
import pandas as pd
from pandas import Series
from pandas import DataFrame
import scipy
import scipy.stats as scs
import statsmodels.api as sm
from fbprophet import Prophet # for forecasting via ML called "Prophet"
from fbprophet.plot import plot_plotly
import logging
logging.getLogger().setLevel(logging.ERROR) # mutes unimportant diagnostic messages
import sklearn.decomposition as sck_dec # has PCA functionalities
import sklearn.decomposition.pca as PCA
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler # for standardizing the Data
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import mean_squared_error
from statsmodels.tsa.arima_model import ARIMA
from arch import arch_model # has ARCH and GARCH models
import seaborn as sn
import matplotlib
from matplotlib import pyplot
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
# matplotlib.style.use('dark_background')
matplotlib.style.use('default')
%matplotlib inline
import matplotlib.pyplot as plt
# standardizes the size of all plots, rather than typing plot(figsize = (12,6)) everytime
from pylab import rcParams
rcParams['figure.figsize'] = 12,6
import seaborn as sns
from plotly import __version__
import cufflinks as cf
from IPython.display import display, HTML, Image
import plotly
from plotly.offline import download_plotlyjs, init_notebook_mode, plot, iplot
import plotly.graph_objects as go
import plotly.figure_factory as ff
import plotly.express as px
# set plotly to offline mode for easy reproducibilty
import plotly.offline as py
py.init_notebook_mode()
from statsmodels.tsa.stattools import adfuller,grangercausalitytests
from statsmodels.tsa.stattools import ccovf,ccf,periodogram
from statsmodels.graphics.tsaplots import plot_acf,plot_pacf
# Deep learning packages
import keras
from keras.models import Sequential
from keras.layers import LSTM, Dense
from keras.layers import Dropout
from keras.layers import *
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from keras.callbacks import EarlyStopping
import warnings
warnings.filterwarnings('ignore')
from fredapi import Fred
import quandl
quandl.ApiConfig.api_key = '-kesKc4A8jkkBwyX3rSX'
fred = Fred(api_key='08ac10f3d6d00a973c9cdec45990f255')
today_date = "2020-12-15"
paper_end_date = "2020-08-21"
#edit end_date to control the end_date of the API call
end_date = today_date
def parser(x):
return datetime.strptime('190'+x, '%Y-%m')
treas = ['FRED/DGS3MO',
'FRED/DGS6MO',
'FRED/DGS1',
'FRED/DGS2',
'FRED/DGS3',
'FRED/DGS5',
'FRED/DGS7',
'FRED/DGS10',
'FRED/DGS20',
'FRED/DGS30',
'FRED/T10Y3M'] # 10-Year Treasury Constant Maturity Minus 3-Month Treasury Constant Maturity
data = quandl.get(treas, start_date = "1993-10-01", end_date=end_date,
order='asc', collapse = 'daily', date_parser=parser)
data.columns = ['tres3mo',
'tres6mo',
'tres1y',
'tres2y',
'tres3y',
'tres5y',
'tres7y',
'tres10y',
'tres20y',
'tres30y',
'yieldsp']
print(data.shape)
data
data.describe()
# check if data contains nan
np.isnan(data).any()
#fill nan values using the fill forward heuristic in case there are any nan values
data_df = data.interpolate(method='spline', order=3).replace([np.inf, -np.inf], 0)
#data_df = data.fillna(method='ffill').replace([np.inf, -np.inf], 0)
np.isnan(data_df).any()
#data1 is just the data DataFrame without the yeildspread variable
data1 = data_df.loc[:, data.columns != 'yieldsp']
data1.shape
10Y minus 3M treasury i.e. Yield Spread
# 10-Year Treasury Constant Maturity Minus 3-Month Treasury Constant Maturity
def parser(x):
return datetime.strptime('190'+x, '%Y-%m')
yieldsp = quandl.get('FRED/T10Y3M', start_date = "1982-01-04", end_date= end_date,collapse = 'daily',
date_parser=parser)
yieldsp.columns = ['yieldsp']
yieldsp.tail()
yieldsp.shape
yieldsp.describe()
#plotly has some compatibility issues with colab, just taking care of those
import plotly.io as pio
pio.renderers.default = 'notebook'
tresury_rates_plot = data1.iplot(asFigure=True, kind='scatter', xTitle='Year', yTitle='Treasury Rates')
tresury_rates_plot.update_layout( title={ 'text': "Treasury Yields of Different Maturities",
'y':0.9, 'x':0.5, 'xanchor': 'center', 'yanchor': 'top'})
tresury_rates_plot.layout.template = 'seaborn'
tresury_rates_plot.show()
ids = ['3-month', '6-month', '1-year','2-year', '5-year', '7-year', '10-year', '20-year', '30-year']
plotly.offline.iplot({
"data": [
go.Surface(
x=ids,
y=data1.index,
z=data1.values,
# colorscale='Greens',
# reverse scale and remove colorbar
reversescale=True,
showscale=True,
# show contours on x axis only
contours={
'x': {
'highlight': True,
'highlightcolor': 'white',
'highlightwidth': 2
},
'y': {
'highlight': True,
'highlightcolor': 'white',
'highlightwidth': 3
},
'z': {
'highlight': False
}
})
],
"layout":
go.Layout(
title='Interactive 3D Yield Curve',
width=1200,
height=800,
# change aspect ratio and axis titles
scene={
'aspectmode': 'manual',
'aspectratio': {
'x': 5,
'y': 15,
'z': 3
},
'yaxis': {
'title': ''
},
'xaxis': {
'title': 'Maturity duration',
'tickmode': 'linear',
'tickfont': {'size': 10}
},
'zaxis': {
'title': 'Treasury Yield (in percent)',
'tickmode': 'linear',
'tickfont': {'size': 10},
'showticklabels': True,
'showgrid': True
}
},
hoverlabel={'bgcolor': 'turquoise'})
})
# Using graph_objects
yield_df = go.Scatter(x = yieldsp.index, y = yieldsp['yieldsp'], line=dict(color='green', width= 1))
layout = go.Layout(title='Yield Spread (10 Year minus 3 Month Treasury Rate) between Jan 1, 2005 and Dec 15, 2020',
title_font_size=15,
xaxis_title="", yaxis_title="Yield Spread (in percentages)",
xaxis_range=[datetime.datetime(2005, 1, 1),
datetime.datetime(2020, 12, 15)])
fig = go.Figure(data=[yield_df], layout=layout)
iplot(fig)
PCA extracts the signal and diminishes the dataset's dimensionality. This is because it finds the fewest number of variables that explain the largest proportion of the data.
The author has decomposed the data into 5 principal components and has then calculated the proportion of total variance explained by each of the components.
X = data1.values
# Standard Scaler transforms dataframe into a normal distribution
sc = StandardScaler().fit_transform(X)
# replace the nan values in the sc array to 0
sc[np.isnan(sc)] = 0
# do PCA with n_components left
pca = PCA(n_components = 5)
pcs = pca.fit_transform(sc)
pc_df = pd.DataFrame(data = pcs, columns = ['PC1', 'PC2', 'PC3', 'PC4', 'PC5'])
pc_df['PC2'] = pc_df['PC2']*(-1) # the negative of PC2 is highly correlated to the yield spread
pc_yieldsp = pd.merge(pc_df, data[['tres10y', 'yieldsp']],
on = data.index).rename(columns= {'key_0': 'Date'}).set_index('Date')
pc_yieldsp.head()
fig_pc = pc_yieldsp[['PC1', 'PC2', 'PC3']].iplot(asFigure=True,
kind='scatter', xTitle='Year', yTitle='Principal Component')
fig_pc.update_layout(
title={
'text': "Three Principal Components (PC) that Constitute the Yield Curve",
'y':0.9,
'x':0.5,
'xanchor': 'center',
'yanchor': 'top'})
fig_pc.show()
We can calculate the proportion of the total variance explained by each $PC_i$ by the relation:
$$V (PC_i) = \frac{\lambda_i}{\lambda_1+\lambda_2+...+\lambda_5}$$# returns Percentage of variance explained by each of the selected components
ratio = pca.explained_variance_ratio_
ratio_df = pd.DataFrame(ratio, columns=['Percentage of variance explained by component'])
print(ratio_df)
fig_ratio = ratio_df.iplot(asFigure=True, kind='scatter', #layout = layout,
xTitle='Principal Component', yTitle='Eigenvalue (Proportion)')
fig_ratio.update_layout(
title={
'text': "Scree Plot: Proportion of Total Variance Explained by each PC",
'y':0.9,
'x':0.5,
'xanchor': 'center',
'yanchor': 'top'})
fig_ratio.show()
The plot of the first principal component looks very similar to the actual 10-year yield curve. This is consistent with our expectation as the first principal component explains 93.52% of the data.
fig_pc1 = pc_yieldsp[['PC1','tres10y']].iplot(asFigure=True,
kind='scatter', xTitle='Year', yTitle='Rates')
fig_pc1.update_layout(
title={
'text': "PC1 and 10-year Treasury Yield",
'y':0.9,
'x':0.5,
'xanchor': 'center',
'yanchor': 'top'})
fig_pc1.show()
The second principal component depicts the slope, which in this case is the difference between the 10 year treasury bond and the 3 month tresury bill, also called the yield spread. Visually, the slope appears nearly identical to $PC_2$.
fig_pc2 = pc_yieldsp[['PC2','yieldsp']].iplot(asFigure=True,
kind='scatter', xTitle='Year', yTitle='Rates')
fig_pc2.update_layout(
title={
'text': "PC2 and Yield Spread (10y-3m Treasury Rate)",
'y':0.9,
'x':0.5,
'xanchor': 'center',
'yanchor': 'top'})
fig_pc2.show()
The high correlation of 0.916 between the 10Y-3M spread and $PC_2$ corroborates the evidence that the second principal component denotes the slope.
#Correlation matrix
pc_yieldsp[['PC1', 'PC2', 'PC3','tres10y', 'yieldsp']].corr()
f = plt.figure(figsize=(8, 6))
plt.matshow(pc_yieldsp[['PC1', 'PC2', 'PC3','tres10y', 'yieldsp']].corr(), fignum=f.number)
plt.xticks(range(pc_yieldsp[['PC1', 'PC2', 'PC3','tres10y', 'yieldsp']].shape[1]), pc_yieldsp[['PC1', 'PC2', 'PC3','tres10y', 'yieldsp']].columns, fontsize=14, rotation=45)
plt.yticks(range(pc_yieldsp[['PC1', 'PC2', 'PC3','tres10y', 'yieldsp']].shape[1]), pc_yieldsp[['PC1', 'PC2', 'PC3','tres10y', 'yieldsp']].columns, fontsize=14)
cb = plt.colorbar()
cb.ax.tick_params(labelsize=14)
plt.title('Correlation Matrix', fontsize=16);
This method employs maximum likelihood estimation to derive the parameters of the Vasicek model, which is of the form:
$$ dr_t = k(\theta-r_t) dt + \sigma dW_t $$where, $k=$ strength of mean reversion or the speed at which the yield spread rates revert to the mean $\theta$;
$\theta$ is the level of mean reversion or the long-run level of yield spread
$\sigma$ is the volatility in yield spread at time t
$r_t $ is the short rate (yield spread) at time t
$k(\theta - r_t)$ are the expected changes in the yield spread or the drift term, also known as the mean reversion for Vasicek model, and
$W$ is the random market risk that follows a Wiener process.
Using the closed form solution given below that avoids “discretization errors”, we have simulated the paths of the yield spread:
$$ r_{t_i} = r_{t_{i-1}} e^{-k(t_i-t_{i-1})} + \theta (1- e^{-k(t_i-t_{i-1})}) + Z \sqrt{\frac{\sigma^2 (1- e^{-2k(t_i-t_{i-1})} )}{2k}} $$where $Z \sim N(0,1)$
# helper functions for Vasicek interest rate model
def VasicekNextRate(r, kappa, theta, sigma, dt=1/252):
# Implements above closed form solution
val1 = np.exp(-1*kappa*dt)
val2 = (sigma**2)*(1-val1**2) / (2*kappa)
out = r*val1 + theta*(1-val1) + (np.sqrt(val2))*np.random.normal()
return out
def VasicekSim(N, r0, kappa, theta, sigma, dt = 1/252):
short_r = [0]*N # Create array to store rates short_r[0] = r0 # Initialise rates at $r_0$
for i in range(1,N):
short_r[i] = VasicekNextRate(short_r[i-1], kappa, theta, sigma, dt)
return short_r
def VasicekMultiSim(M, N, r0, kappa, theta, sigma, dt = 1/252):
sim_arr = np.ndarray((N, M))
for i in range(0,M):
sim_arr[:, i] = VasicekSim(N, r0, kappa, theta, sigma, dt)
return sim_arr
def VasicekCalibration(rates, dt=1/252):
n = len(rates)
# Implement MLE to calibrate parameters
Sx = sum(rates[0:(n-1)])
Sy = sum(rates[1:n])
Sxx = np.dot(rates[0:(n-1)], rates[0:(n-1)])
Sxy = np.dot(rates[0:(n-1)], rates[1:n])
Syy = np.dot(rates[1:n], rates[1:n])
theta = (Sy * Sxx - Sx * Sxy) / (n * (Sxx - Sxy) - (Sx**2 - Sx*Sy))
kappa = -np.log((Sxy - theta * Sx - theta * Sy + n * theta**2) / (Sxx - 2*theta*Sx + n*theta**2)) / dt
a = np.exp(-kappa * dt)
sigmah2 = (Syy - 2*a*Sxy + a**2 * Sxx - 2*theta*(1-a)*(Sy - a*Sx) + n*theta**2 * (1-a)**2) / n
sigma = np.sqrt(sigmah2*2*kappa / (1-a**2))
r0 = rates[n-1]
return [kappa, theta, sigma, r0]
params = VasicekCalibration(yieldsp.loc[:, 'yieldsp'].dropna()/100)
kappa = params[0]
theta = params[1]
sigma = params[2]
r0 = params[3]
years = 1
N = years * 252
t = np.arange(0,N)/252
test_sim = VasicekSim(N, r0, kappa, theta, sigma, 1/252)
plt.plot(t,test_sim)
plt.ylabel("Yield Spread")
plt.xlabel("Years")
plt.show()
Simulating from $r_0$ = last observed value and assuming $\theta = 1.75 \% $, we have generated a sequence of 10 ex-ante trajectories of the yield spread.
M = 10
rates_arr = VasicekMultiSim(M, N, r0, kappa, theta, sigma)
plt.plot(t,rates_arr)
plt.hlines(y=theta, xmin = -100, xmax=100, zorder=10, linestyles = 'dashed', label='Theta')
plt.annotate('Theta', xy=(1.0, theta + 0.0005))
plt.xlim(-0.05, 1.05)
plt.ylabel("Yield Spread")
plt.xlabel("Years")
plt.title("Long-run (mean reversion level) yield spread, theta = 1.75% ")
plt.show()
We observe the model's mean reverting nature by specifying $r_0$ further away from $\theta$. Over time this pulls the yield spread towards $\theta$.
M = 10
rates_arr = VasicekMultiSim(M, N, -0.01, kappa, theta, sigma)
plt.plot(t,rates_arr)
plt.hlines(y=theta, xmin = -100, xmax=100, zorder=10, linestyles = 'dashed', label='Theta')
plt.annotate('Theta', xy=(1.0, theta+0.0005))
plt.xlim(-0.05, 1.05)
plt.ylabel("Yield Spread")
plt.xlabel("Years")
plt.title("Last observed value, r0, is further away from theta to -1% ")
plt.show()
The magnitude of $\kappa$ controls the speed of the reversion. As $\kappa$ grows, mean reversion quickens.
M = 10
rates_arr = VasicekMultiSim(M, N, -0.01, kappa*10, theta, sigma)
plt.plot(t,rates_arr)
plt.hlines(y=theta, xmin = -100, xmax=100, zorder=10, linestyles = 'dashed', label='Theta')
plt.annotate('Theta', xy=(1.0, theta+0.0005))
plt.xlim(-0.05, 1.05)
plt.ylabel("Yield Spread")
plt.xlabel("Years")
plt.title("Kappa (mean reversion speed) scaled by 10 times")
plt.show()
Likewise, larger $\sigma$ widens the volality and the potential distribution rate.
M = 10
rates_arr = VasicekMultiSim(M, N, -0.01, kappa*2, theta, sigma*5)
plt.plot(t,rates_arr)
plt.hlines(y=theta, xmin = -100, xmax=100, zorder=10, linestyles = 'dashed', label='Theta')
plt.annotate('Theta', xy=(1.0, theta+0.0005))
plt.xlim(-0.05, 1.05)
plt.ylabel("Yield Spread")
plt.xlabel("Years")
plt.title("Kappa and sigma (volatility in yield spread) scaled by 2 and 5 times, respectively")
plt.show()
We first check for stationarity of the yield spread using the Augmented Dickey Fuller Test.
import statsmodels.tsa.api as smt
'''
Test stationarity by:
- running Augmented Dickey-Fuller test with 95%
- plotting mean and variance of a sample from data
- plotting autocorrelation and partial autocorrelation
'''
def test_stationarity_acf_pacf(ts, sample=0.20, maxlag=30, figsize= (18,12)):
# configuring all the plot in one image itself
plt.style.context(style='default')
stationarity_fig = plt.figure(figsize=figsize, dpi = 200)
layout = (3, 2)
ts_plot = plt.subplot2grid(layout, loc=(0,0), colspan=2)
pacf_plot = plt.subplot2grid(layout, loc=(1,0))
acf_plot = plt.subplot2grid(layout, loc=(1,1))
qq_plot = plt.subplot2grid(layout, loc=(2, 0))
pp_plot = plt.subplot2grid(layout, loc=(2, 1))
## plot ts with mean/std of a sample from the first x%
df_ts = ts.to_frame(name="ts")
sample_size = int(len(ts)*sample)
df_ts["mean"] = df_ts["ts"].head(sample_size).mean()
df_ts["lower_limit"] = df_ts["ts"].head(sample_size).mean() + df_ts["ts"].head(sample_size).std()
df_ts["upper_limit"] = df_ts["ts"].head(sample_size).mean() - df_ts["ts"].head(sample_size).std()
df_ts["ts"].plot(ax=ts_plot, color="black", legend=False)
df_ts["mean"].plot(ax=ts_plot, legend=False, color="red")
ts_plot.fill_between(x=df_ts.index, y1=df_ts['lower_limit'],
y2=df_ts['upper_limit'], color='paleturquoise', alpha=0.4)
df_ts["mean"].head(sample_size).plot(ax=ts_plot,
legend=False, color="red", linewidth=0.9)
ts_plot.fill_between(x=df_ts.head(sample_size).index,
y1=df_ts['lower_limit'].head(sample_size),
y2=df_ts['upper_limit'].head(sample_size),
color='lightskyblue')
## test stationarity (Augmented Dickey-Fuller)
adfuller_test = sm.tsa.stattools.adfuller(ts, maxlag=maxlag, autolag="AIC")
adf, p, critical_value = adfuller_test[0], adfuller_test[1], adfuller_test[4]["5%"]
p = round(p, 3)
conclusion = " Stationary" if p < 0.05 else " Non-Stationary"
ts_plot.set_title('Augmented Dickey-Fuller Test 95% of '+ variable + ':' + conclusion +
' (p value: '+str(p)+')')
## pacf (for MA) e acf (for AR)
smt.graphics.plot_pacf(ts, lags=maxlag, ax = pacf_plot,
title="Partial Autocorrelation Function (PACF) of " + variable)
smt.graphics.plot_acf(ts, lags=maxlag, ax=acf_plot,
title="Autocorrelation Function (ACF) of "+ variable)
sm.qqplot(ts, line='s', ax=qq_plot)
qq_plot.set_title('QQ Plot of ' + variable)
scs.probplot(ts, sparams=(ts.mean(), ts.std()), plot=pp_plot)
# plt.suptitle("Stationarity analysis for {}".format(variable))
plt.tight_layout()
plt.show()
for variable in yieldsp.columns:
test_stationarity_acf_pacf(yieldsp[variable], sample=0.20, maxlag=20)
A low p-value of 0.007 indicates that yieldsp
is stationary; resulting in yieldsp having constant variance and covariance.
The ACF plot shows a strong autocorrelation of lags as spikes very gradually reduce.
The PACF shows a significant lag for perhaps 3 days, with significant lags spotty out to perhaps 20 days. We can try MA(4).
diff = yieldsp['yieldsp'].diff().to_frame()
yield_diff = diff.dropna(inplace = False).replace([np.inf, -np.inf], 0).dropna(axis=1)
for variable in yield_diff.columns:
test_stationarity_acf_pacf(yield_diff[variable], sample=0.20, maxlag=20)
From the ACF and PACF plots of the differenced yield curve, we can try AR(3) and MA(3).
X = yieldsp.values
size = int(len(X) * 0.95)
train, test = X[0:size], X[size:len(X)]
# fit model in the whole dataset
model = ARIMA(train, order=(1,0,3))
model_fit_arima = model.fit(disp=0)
print(model_fit_arima.summary())
Fitting the ARIMA(1, 0, 3) model yields the following equation with an AIC of − 20, 952.034 :
$$ yieldsp_t = 1.7712 + 0.9978 yieldsp_{t-1} + 0.0438 \epsilon_{t-1} - 0.0211 \epsilon_{t-2} - 0.0696 \epsilon_{t-3} $$Plotting residuals from ARMA(1,0,3) on yield spread
# plot residual errors
resid_arima = DataFrame(model_fit_arima.resid, columns = ['resid_arima'])
fig_res = resid_arima.iplot(asFigure=True, kind='scatter', xTitle='Index', yTitle='Residuals')
fig_res.update_layout(
title={
'text': "Residuals from ARIMA(1,0,3)",
'y':0.9,
'x':0.5,
'xanchor': 'center',
'yanchor': 'top'})
fig_res.show()
hist_data = [resid_arima['resid_arima']]
group_labels = ['distplot'] # name of the dataset
fig_kde = ff.create_distplot(hist_data, group_labels, bin_size=0.01)
fig_kde.update_layout(
title={
'text': "Histogram, KDE and a Rug Plot of Residuls from ARIMA(1,0,3)",
'y':0.9,
'x':0.5,
'xanchor': 'center',
'yanchor': 'top'})
fig_kde.show()
print(resid_arima.describe())
The plots suggest that the errors are Gaussian with a mean centered at approximately zero (-0.000066), indicating no bias in the predictions. The line plot of residuals suggest that the model captures of the trend information.
# we perform a rolling forecast i.e. re-create the ARIMA forecast when a new observation is received.
X = yieldsp.values
size = int(len(X) * 0.95)
train, test = X[0:size], X[size:len(X)]
history = [x for x in train]
predictions = list()
# walk-forward validation
for t in range(len(test)):
model = ARIMA(history, order=(1,0,3))
model_fit = model.fit(disp=0)
output = model_fit.forecast()
yhat = output[0]
predictions.append(yhat)
obs = test[t]
history.append(obs)
print('predicted=%f, expected=%f' % (yhat, obs))
pred = pd.DataFrame(predictions, columns = ['pred'])
print("Shape of predictions dataframe is {}".format(pred.shape))
#yieldsp.iloc[-len(test):,]
pred.index = pd.date_range(start="2019-08-16",end="2020-12-15" )
test = pd.DataFrame(test, columns = ['test_yieldsp'])
# print(pred.shape)
test.index = pd.date_range(start="2019-08-16",end="2020-12-15")
# print(test.shape)
# combine the predictions from leveled ARIMA with the test values
merge = pd.merge(pred, test, how='inner', left_index=True, right_index=True)
merge.tail()
# print(merge.shape)
error = sqrt(mean_squared_error(merge['test_yieldsp'], merge['pred']))
print('Test RMSE from the leveled model - ARIMA(1,0,3): %.5f' % error)
Thus, the final RMSE for the ARMA(1,0,3) model is 0.05110.
yield_diff¶X_diff = yield_diff.values
size = int(len(X_diff) * 0.95)
train_diff, test_diff = X_diff[0:size], X_diff[size:len(X_diff)]
history = [x for x in train_diff]
predictions = list()
# fit model
model_diff = ARIMA(train_diff, order=(3,1,3))
mod_fit_diff = model_diff.fit(disp=0)
print(mod_fit_diff.summary())
Fitting the ARIMA(3,1, 3) model on the differenced yield spread yields the following equation with an AIC of − 21141.512 :
$$ \Delta yieldsp_t = -7.392*10^{-8} + 0.1647 \Delta yieldsp_{t-1} + 0.1576 \Delta yieldsp_{t-2} -0.0594 \Delta yieldsp_{t-3} - 1.1186 \epsilon_{t-1} - 0.0759 \epsilon_{t-2} + 0.1945 \epsilon_{t-3} $$Plotting residuals from ARMA(3,1,3) on yield spread
# plot residual errors
resid_diff = DataFrame(mod_fit_diff.resid, columns = ['resid_diff'])
fig_res = resid_diff.iplot(asFigure=True, kind='scatter', xTitle='Index', yTitle='Residuals')
fig_res.update_layout(
title={
'text': "Residuals from Differenced ARIMA(3,1,3)",
'y':0.9,
'x':0.5,
'xanchor': 'center',
'yanchor': 'top'})
fig_res.show()
hist_data = [resid_diff['resid_diff']]
group_labels = ['distplot'] # name of the dataset
fig_kde = ff.create_distplot(hist_data, group_labels, bin_size=0.01)
fig_kde.update_layout(
title={
'text': "Histogram, KDE and a Rug Plot of Residuls from Differenced ARIMA(3,1,3)",
'y':0.9,
'x':0.5,
'xanchor': 'center',
'yanchor': 'top'})
fig_kde.show()
print(resid_diff.describe())
# we perform a rolling forecast i.e. re-create the ARIMA forecast when a new observation is received.
X = yieldsp['yieldsp'].values
size = int(len(X) * 0.95)
train_diff, test_diff = X[0:size], X[size:len(X)]
history = [x for x in train]
predictions_diff = list()
gap = 1
def difference(dataset, interval=1):
diff = list()
for i in range(interval, len(dataset)):
value = dataset[i] - dataset[i - interval]
diff.append(value)
return np.array(diff)
diff = difference(X, 1)
def invert_diff(history, yhat, interval=1):
return yhat + history[-interval]
# walk-forward validation
for t in range(len(test)):
model_diff = ARIMA(diff, order=(3,1,3))
model_diff_fit = model_diff.fit(disp=0)
output = model_diff_fit.forecast()
yhat = output[0]
inverted = invert_diff(history, yhat, gap)
predictions_diff.append(inverted)
obs = test_diff[t]
history.append(obs)
print('predicted=%f, expected=%f' % (inverted, obs))
pred_inv = pd.DataFrame(predictions_diff, columns = ['pred_diff'])
pred_inv.index = pd.date_range(start="2019-08-16",end="2020-12-15" )
merge_inv = pd.merge(pred_inv, merge, how='inner', left_index=True, right_index=True)
merge_inv.tail()
# calculate RMSE
rmse = sqrt(mean_squared_error(merge_inv['test_yieldsp'], merge_inv['pred_diff']))
print('Test RMSE from the differenced model - ARIMA(3,1,3): %.5f' % rmse)
Thus, the final RMSE for the ARMA(1,0,3) model is 0.05147.
Descriptive statistics of the residuals from both ARIMA models
| Residuals from ARIMA(1, 0, 3) | Residuals from Difference ARIMA(3, 1, 3) | |
|---|---|---|
| mean | -0.000066 | 0.000555 |
| std | 0.077213 | 0.077457 |
| min | -1.214803 | -1.136089 |
| 25% | -0.037550 | -0.037421 |
| 50% | -0.001489 | -0.000781 |
| 75% | 0.036286 | 0.037110 |
| max | 0.753097 | 0.750472 |
Plotting the ARIMA(1,0,3) model on yield spread vs ARIMA(3,1,3) model on the differenced yield spread
# line plot shows that compares the actual yieldsp with rolling forecast predictions from leveled and diff ARIMA
fig_arima_diff = merge_inv.iplot(asFigure=True, kind='scatter', xTitle='Date',
yTitle='Predicted and Actual Yield Spread')
fig_arima_diff.update_layout(
title={
'text': "Actual and Forecasted Yield Spread from ARIMA(1,0,3) and ARIMA(3,1,3)",
'y':0.9,
'x':0.5,
'xanchor': 'center',
'yanchor': 'top'})
fig_arima_diff.show()
The author uses (G)ARCH models that treat heteroskedasticity by modeling the variance.
Empirically, after constructing ARIMA, we can use (G)ARCH to model the expected variance on the residuals,
provided that the residuals are not serially correlated and don't have any trend and seasonal patterns.
When fitting an AR(p), we observe the decay of the $p$ lag displayed in the ACF plot of the residuals and the squared residuals. To apply ARCH we need to ensure that the mean of the residuals = 0.
resid_arima['resid_arima_squared'] = resid_arima['resid_arima']**2
resid_arima.head()
Checking stationarity for resid_arima and resid_arima_squared
for variable in resid_arima.columns:
test_stationarity_acf_pacf(resid_arima[variable], sample=0.20, maxlag=30)
Whilst the residuals from ARIMA(1,0,3) appear to be white noise process, the squared residuals are highly autocorrelated.
The slow decay of successive lags indicate conditional heteroskedasticity.
Given that the lags in both the PACF and ACF plots are significant, the model would be better fit with both AR and MA parameters. So, we have fit a GARCH(1,3) model and checked how the residuals from GARCH(1,3) behave.
# Now we can fit the arch model using the best fit arima model parameters
ar = arch_model(resid_arima['resid_arima'], p=1, o=0, q=3, power=1.0)
res = ar.fit(update_freq=5)
print(res.summary())
We can see that the true parameters: $\Omega, \alpha, \beta_1, \beta_2, \beta_3$, fall within the respective confidence intervals.
Thus, we can write GARCH(1, 3) as:
$$ \sigma_t^2 = 1.7556*10^{-03} + 0.1142 \sigma_{t-1}^2 + 0.8 \epsilon_{t-1}^2 + 7.4291*10^{-12} \epsilon_{t-2}^2 + 0.0859 \epsilon_{t-3}^2 $$resid_garch = pd.DataFrame(res.resid).rename(columns = {'resid': 'resid_garch'})
resid_garch['resid_garch_sq'] = resid_garch['resid_garch']**2
resid_garch.head()
resid_garch and resid_garch_squared¶for variable in resid_garch.columns:
test_stationarity_acf_pacf(resid_garch[variable], sample=0.20, maxlag=30)
So, GARCH(1, 3) has not "explained" the serial correlation present in the squared residuals, inhibiting us from predicting in the test set.
The author uses the following exogenous variables to forecast the yeild spread:
sahmforward1yrrec_indtermprinfexpvixted1yrffdef parser(x):
return datetime.strptime('190'+x, '%Y-%m')
week = ["FRED/CCSA", # UI claims - weekly;
"FRED/WALCL", # Total Assets (Less Eliminations From Consolidation) - weekly;
"FRED/RPONTSYD"] # Overnight Repurchase Agreements: Treasury Securities Purchased by Fed in OMO - weekly
daily = [
"FRED/T10Y3M", # 10 year treasury constant maturity - 3 month tres cons mat
"FRED/TEDRATE", # TED Spread - daily ;
"FRED/THREEFF1" , # Fitted Instantaneous Forward Rate 1 Year Hence - daily;
"FRED/USRECD", # NBER based recession indicator - daily
"FRED/T1YFF" , # 1-Year Treasury Constant Maturity Minus Federal Funds Rate
"FRED/THREEFYTP10" ] # Term Premium on a 10 Year Zero Coupon Bond - daily
yieldd = quandl.get(daily, start_date = "1990-01-01", end_date="2020-10-01", collapse = 'daily',date_parser=parser)
yieldd.columns = [
'yieldsp',
'ted',
'forward1yr',
'rec_ind',
'1yr-ffr',
'termpr'
]
yieldd = yieldd.fillna(method = 'bfill')
yieldd.tail()
fredf = {}
fredf['sahm'] = fred.get_series('SAHMREALTIME', observation_start='1990-01-01', observation_end='2020-10-01')
fredf['infexp'] = fred.get_series('MICH', observation_start='1990-01-01', observation_end='2020-10-01')
fredf['vix'] = fred.get_series('VIXCLS', observation_start='1990-01-01', observation_end='2020-10-01')
fredf = pd.DataFrame(fredf)
# apply cubic spline interpolation to get daily value of sahm rule from monthly values
# bfill = use next valid observation to fill gap.
upsample = fredf.resample('D').interpolate(method='cubic').fillna(method = 'bfill')
upsample.index.name = 'Date'
upsample.tail()
yield_d = pd.merge(yieldd, upsample, on = 'Date')
yield_d.head()
yield_d.isnull().sum()
We use goodness-of-fit measure : the D’Agostino’s $K^2$ Test to check if the sample data on the yield spread arises from a Gaussian distributed population. Transforming the sample skewness and kurtosis derives the test statistic.
Test statistic:
Ho: data is normally distributed
Ha: data is kurtic and/or skewed (not normally distributed)
from scipy import stats
stat, p = stats.normaltest(yield_d.yieldsp)
print('Statistics=%.3f, p=%.3f' % (stat, p))
alpha = 0.05
if p > alpha:
print('Data looks Gaussian (fail to reject H0)')
else:
print('Data does not look Gaussian (reject H0)')
The small p-value of 0.00 rejects the null hypothesis. Hence, the sample of yield spread is not normally distributed.
hist_data = [yield_d.yieldsp]
group_labels = ['distplot']
fig_kde = ff.create_distplot(hist_data, group_labels, bin_size=0.01)
fig_kde.update_layout(
title={
'text': "Histogram, KDE and a Rug Plot of the Yield Spread",
'y':0.9,
'x':0.5,
'xanchor': 'center',
'yanchor': 'top'})
fig_kde.show()
# calculate kurtosis and skewness, to determine if the data distribution departs from the normal distribution.
print( 'Kurtosis of normal distribution: {}'.format(stats.kurtosis(yield_d.yieldsp)))
print( 'Skewness of normal distribution: {}'.format(stats.skew(yield_d.yieldsp)))
def adfuller_test(series, signif = 0.05, name = '', verbose = False):
"""
Perform ADFuller to test for stationarity of a given series and print report
"""
r = adfuller(series, autolag = 'AIC')
output = {'test_statistic': round(r[0], 4), 'pvalue': round(r[1], 4), 'n_lags':round(r[2], 4), 'n_obs':r[3]}
p_value = output['pvalue']
def adjust(val, length = 6):
return str(val).ljust(length)
print(f' Augmented Dickey-Fuller Test on "{name}"', "\n ", '-'*47)
print(f' Null Hypothesis: Data has unit too. Non-Stationary.')
print(f' Significance Level = {signif}')
print(f' Test Statistic = {output["test_statistic"]}')
print(f' No. of Lags Chosen = {output["n_lags"]}')
for key, val in r[4].items():
print(f' Critical Value {adjust(key)} = {round(val, 3)}')
if p_value <= signif:
print(f" => P-Value = {p_value}. Rejecting null hypothesis.")
print(f" => Series is stationary.")
else:
print(f" => P-Value = {p_value}. Weak evidence to reject the Null Hypothesis.")
print(f" => Series is non-stationary.")
data_list = yield_d[list(yield_d.columns)]
for name, column in data_list.iteritems():
adfuller_test(column, name = column.name)
print('\n')
| Variable | p -Value | Stationary? |
|---|---|---|
| yieldsp | 0.1268 | No |
| ted | 0.000 | Yes |
| forward1yr | 0.5191 | No |
| rec_ind | 0.0035 | Yes |
| 1yr-ffr | 0.000 | Yes |
| termpr | 0.595 | No |
| sahm | 0.000 | Yes |
| infexp | 0.000 | Yes |
| vix | 0.000 | Yes |
Here, I get results different from the author for sahm.
# stationarizes the variables by taking the first difference, and concats them with the dataset
yield_d1 = pd.concat([yield_d, yield_d[['forward1yr','termpr','yieldsp']]
.diff().add_suffix("_diff")], axis = 1)
# drops rows with NaN values, and replaces inf values with 0
stat1 = yield_d1.dropna(inplace = False).replace([np.inf, -np.inf], 0).dropna(axis=1)
I have done AD Fuller test on the differenced variables once again to verify that they are stationary.
stat_list1 = stat1[['forward1yr_diff', 'termpr_diff','yieldsp_diff']]
# ADF Test on the first differenced values each column
for name, column in stat_list1.iteritems():
adfuller_test(column, name = column.name)
print('\n')
Thus, we have the following:
| Stationary Variables | ted, rec_ind, 1yr-ffr, vix, infexp, sahm |
| Variables that need first differencing | forward1yr, termpr, yieldsp |
The author has modeled SARIMAX(2,1,2) without any seasonal component and incorporated the stationary exogenous variables.
train, test = np.split(stat1, [int(.95 *len(stat1))])
exog_train = train[['ted', 'rec_ind', '1yr-ffr', 'vix', 'termpr_diff', 'forward1yr_diff', 'infexp']]
exog_test = test[['ted', 'rec_ind', '1yr-ffr', 'vix', 'termpr_diff','forward1yr_diff', 'infexp']]
exog = stat1[['ted', 'rec_ind', '1yr-ffr', 'vix', 'termpr', 'forward1yr', 'infexp', 'sahm', '1yr-ffr']]
exog_test_sarima = test[['ted', 'rec_ind', '1yr-ffr', 'vix', 'termpr', 'forward1yr', 'infexp', 'sahm', '1yr-ffr']]
mod = sm.tsa.statespace.SARIMAX(stat1['yieldsp'], exog = exog,
order = (2, 1, 2),
seasonal_order = (0, 0, 0, 0),
enforce_stationarity = True,
enforce_invertibility = True)
results = mod.fit()
print(results.summary())
The SARIMAX(2,1,2) model is:
$$ yieldsp_t = 0.4031 ted_t -0.0072 \text{rec_ind}_t + 0.0065 \text{1yrffr}_t - 0.0002 vix_t + 2.4013 \Delta termpr_t - 0.4190 \Delta forward1yr_t - 0.0168 \text{infexp}_t + 0.0259 sahm_t + 0.0065 \text{1yrffr}_t + 1.0625 yieldsp_{t-1} - 0.6652 yieldsp_{t-2} - 0.9914 \epsilon_{t-1} + 0.5403 \epsilon_{t-2}$$with an AIC of -49768.613
# run model diagnostics to ensure that none of the assumptions made by the model have been violated.
results.plot_diagnostics(figsize=(20, 12))
plt.show()
pred_dynamic = results.get_prediction(start = pd.to_datetime('2018-11-24'), dynamic = True,
exog = exog_test_sarima, full_results = True)
pred_dynamic_ci = pred_dynamic.conf_int(exog = exog_test)
# plot the real and forecasted values of the yield spread
ax = stat1['yieldsp']['2018-10-28':].plot(label='observed', figsize=(20, 10))
pred_dynamic.predicted_mean.plot(label='Dynamic Forecast', ax = ax, fontsize = 16)
ax.fill_between(pred_dynamic_ci.index,
pred_dynamic_ci.iloc[:, 0],
pred_dynamic_ci.iloc[:, 1], color='k', alpha=.25)
ax.fill_betweenx(ax.get_ylim(), pd.to_datetime('2018-11-01'), stat1['yieldsp'].index[-1],
alpha=.1, zorder=-1)
ax.set_xlabel('Date', fontsize = 18)
ax.set_ylabel('Yield Spread', fontsize = 18)
ax.set_title("Actual and Predicted Yield Spread from SARIMAX(2,1,2)", fontsize = 20)
plt.legend()
plt.show()
y_forecasted = pred_dynamic.predicted_mean
y_truth = stat1['yieldsp']['2018-11-24':]
# Compute the mean square error
rmse = sqrt(((y_forecasted - y_truth) ** 2).mean())
print('The Root Mean Squared Error of our forecasts is {}'.format(round(rmse, 4)))
exog_test.tail()
SARIMAX(2,1,2) predicts rec_ind 1.0 during September 2020 which is as expected.
The author has appliead the Granger Causality test to determine variables have causal relationship with $\Delta yieldsp$ .
Specifying the maximum number of lags specified as 40, the equation to test the null hypothesis is:
$$ \Delta yieldsp_t = \alpha_0 + \sum_{i=1}^{40} \alpha_i \ yieldsp_{t-i} + \sum_{j=1}^{40} \beta_j \ x_{t-j} $$where $x_j$ refers to a stationary explanatory variable.
# granger causality for all variables in the train set
stat_train = train[['termpr_diff', 'forward1yr_diff', 'ted', 'vix', 'yieldsp', 'yieldsp_diff',
'rec_ind', 'infexp', '1yr-ffr']]
maxlag=40
def grangers_causality_matrix(data, variables, test = 'ssr_chi2test', verbose=False):
dataset = pd.DataFrame(np.zeros((len(variables), len(variables))), columns=variables, index=variables)
for c in dataset.columns:
for r in dataset.index:
test_result = grangercausalitytests(data[[r,c]], maxlag=maxlag, verbose=False)
p_values = [round(test_result[i+1][0][test][1],4) for i in range(maxlag)]
if verbose: print(f'Y = {r}, X = {c}, P Values = {p_values}')
min_p_value = np.min(p_values)
dataset.loc[r,c] = min_p_value
dataset.columns = [var + '_x' for var in variables]
dataset.index = [var + '_y' for var in variables]
return dataset
granger_matrix = grangers_causality_matrix(stat_train, variables = stat_train.columns)
plt.figure(figsize = (10,8))
sn.heatmap(granger_matrix, annot=True, cmap="YlGnBu")
The row are the response ($y$) and the columns are the predictor series ($x$).
All the variables except infexp granger cause yieldsp as each of their p-vaalues = 0.00 < 0.05.
The author has used the AIC criteria to find the optimal lag period for the VAR model.
# should use only the stationary variables for VAR
stat_train = train[['yieldsp_diff','termpr_diff', 'forward1yr_diff', 'ted', 'vix', 'rec_ind', '1yr-ffr']]
from statsmodels.tsa.api import VAR
AIC = {}
best_aic, best_order = np.inf, 0
for i in range(1,50):
model = VAR(endog=stat_train.values)
model_result = model.fit(maxlags=i)
AIC[i] = model_result.aic
if AIC[i] < best_aic:
best_aic = AIC[i]
best_order = i
print('Best Order', best_order, 'BEST AIC:', best_aic)
plt.figure(figsize=(20,6))
plt.plot(range(len(AIC)), list(AIC.values()))
plt.plot([best_order-1], [best_aic], marker='o', markersize=8, color="blue")
plt.xticks(range(len(AIC)), range(1,50))
plt.xlabel('lags'); plt.ylabel('AIC')
plt.title("Measure of VAR's Performance via AIC over Lags and the best AIC")
np.set_printoptions(False)
We have aggregated the variables used in VAR(45) as a (7 × 1) vector of series variables:
$$ Y_t = [\Delta yieldsp_t \ \ \Delta termpr_t \ \ \Delta forward1yr_t \ \ 1yrffr_t \ \ \text{rec_ind}_t \ \ ted_t \ \ vix_t]' $$VAR(45) is a seemingly unrelated regression (SUR) model with $k = 7$ deterministic regressors and $l = 45$ lagged variables of the form:
$$ \Delta yieldsp_{t} = \alpha_1 + \sum_{i=1}^{7} \sum_{l=1}^{45} \beta_{i,l}^{\Delta yieldsp} \Delta yieldsp_{t-1} + \epsilon_{\Delta yieldsp_{t}} $$$$ \Delta termpr_{t} = \alpha_2 + \sum_{i=1}^{7} \sum_{l=1}^{45} \beta_{i,l}^{\Delta termpr} \Delta yieldsp_{t-1} + \epsilon_{\Delta termpr_{t}} $$$$ \Delta forward1yr_{t} = \alpha_3 + \sum_{i=1}^{7} \sum_{l=1}^{45} \beta_{i,l}^{\Delta forward1yr} \Delta yieldsp_{t-1} + \epsilon_{\Delta forward1yr_{t}} $$$$ \text{1yr-frr}_{t} = \alpha_4 + \sum_{i=1}^{7} \sum_{l=1}^{45} \beta_{i,l}^{\text{1yr-frr}} \Delta yieldsp_{t-1} + \epsilon_{\text{1yr-frr}_t} $$$$ \text{rec_ind}_{t} = \alpha_5 + \sum_{i=1}^{7} \sum_{l=1}^{45} \beta_{i,l}^{\text{rec_ind}} \Delta yieldsp_{t-1} + \epsilon_{\text{rec_ind}_t} $$$$ ted_{t} = \alpha_6 + \sum_{i=1}^{7} \sum_{l=1}^{45} \beta_{i,l}^{ted} \Delta yieldsp_{t-1} + \epsilon_{ted_{t}} $$$$ vix_{t} = \alpha_7 + \sum_{i=1}^{7} \sum_{l=1}^{45} \beta_{i,l}^{vix} \Delta yieldsp_{t-1} + \epsilon_{vix_{t}} $$where $ \epsilon_{t} = [ \epsilon_{{\Delta yieldsp}_{t}} \ \ \epsilon_{{\Delta termpr}_{t}} \ \ \epsilon_{{\Delta forward1yr}_{t}} \ \ \epsilon_{{1yrffr}_{t}} \ \ \epsilon_{{\text{rec_ind}}_{t}} \ \ \epsilon_{{ted}_{t}} \ \ \epsilon_{{vix}_{t}}]' $ is a white noise process of "innovations".
$ \epsilon_{t} = [ \epsilon_{{\Delta yieldsp}_{t}} \ \ \epsilon_{{\Delta termpr}_{t}} \ \ \epsilon_{{\Delta forward1yr}_{t}} \ \ \epsilon_{{1yrffr}_{t}} \ \ \epsilon_{{\text{rec_ind}}_{t}} \ \ \epsilon_{{ted}_{t}} \ \ \epsilon_{{vix}_{t}}]' $
# fit the VAR model
mod = VAR(stat_train)
res = mod.fit(best_order)
res.summary()
The author claims that the durbin-watson statistic for all the variables in the VAR system is 2, implying that they are serially uncorrelated.
from statsmodels.stats.stattools import durbin_watson
out = durbin_watson(res.resid)
for col, val in zip(stat_train.columns, out):
print((col), ':', round(val, 2))
lag_order = res.k_ar
train, test = np.split(stat1, [int(.95 *len(stat1))])
stat_test = test[['yieldsp_diff','termpr_diff', 'forward1yr_diff', 'ted', 'vix', 'rec_ind', '1yr-ffr']]
input_data = stat_test.values[-lag_order:]
pred = res.forecast(y=input_data, steps = len(test))
pred1 = pd.DataFrame(pred, index=stat_test.index, columns= stat_test.columns + '_pred')
pred1.head()
merge0 = pd.merge(test['yieldsp'], pred1['yieldsp_diff_pred'].to_frame(), how='inner',
left_index=True, right_index=True)
merge0.head()
def invert_diff(history, yhat, interval=1):
return yhat + history[-interval]
invert0 = invert_diff(merge0['yieldsp'], merge0['yieldsp_diff_pred'], 1).to_frame()
invert0.rename(columns={'yieldsp_diff_pred':'invert_yieldsp_var'}, inplace=True)
merge_invert0 = pd.merge(merge0, invert0, how='inner', left_index=True,
right_index=True).drop(['yieldsp_diff_pred'], axis = 1)
merge_invert0.tail()
error = mean_squared_error(merge_invert0['yieldsp'], merge_invert0['invert_yieldsp_var'])
rmse = sqrt(error)
print('RMSE: %.5f' % rmse)
The author has examined the properties of VAR by doing structural analyses using impulse responses and forecast error variance decomposition.
IRFs trace out the time path of the effects of an exogenous shock $\epsilon_t$ to one (or more) of the endogenous variables on some or all of the other variables in a VAR system given that no future innovations are present in the system.
irf1 = res.irf(10)
irf1.plot(response ='yieldsp_diff');
The impulse response of shocks on $\Delta yieldsp$ decays to 0 by the tenth period, signifying that one-time innovations don’t have long-term ramifications on the paths of $\Delta yieldsp$.
Another way to characterize the dynamics associated with the VAR is by computing the FEVD from the VAR model.
fevd = res.fevd(5)
fevd.summary()
The author has made various MLP network by varying the number of neurons (1,3 and 5) with the lag period = 3. The author has also run every experiment 8 times.
So, i have set up a generic function to model all the MLPs.
Can defintely improve here; do for a bunch of lag periods and neurons, this is a very shallow network, can improve significantly using deeper networks.
def parser(x):
return datetime.strptime('190'+x, '%Y-%m')
def timeseries_to_supervised(data, lag=1):
df = DataFrame(data)
columns = [df.shift(i) for i in range(1, lag+1)]
columns.append(df)
df = pd.concat(columns, axis=1)
df = df.drop(0)
return df
def difference(dataset, interval=1):
diff = list()
for i in range(interval, len(dataset)):
value = dataset[i] - dataset[i - interval]
diff.append(value)
return Series(diff)
def inverse_difference(history, yhat, interval=1):
return yhat + history[-interval]
def scale(train, test):
# fit scaler
scaler = MinMaxScaler(feature_range=(-1, 1))
scaler = scaler.fit(train)
# transform train
train = train.reshape(train.shape[0], train.shape[1])
train_scaled = scaler.transform(train)
# transform test
test = test.reshape(test.shape[0], test.shape[1])
test_scaled = scaler.transform(test)
return scaler, train_scaled, test_scaled
def invert_scale(scaler, X, yhat):
new_row = [x for x in X] + [yhat]
array = np.array(new_row)
array = array.reshape(1, len(array))
inverted = scaler.inverse_transform(array)
return inverted[0, -1]
def evaluate(model, raw_data, scaled_dataset, scaler, offset, batch_size):
X, y = scaled_dataset[:,0:-1], scaled_dataset[:,-1]
output = model.predict(X, batch_size=batch_size)
predictions = list()
for i in range(len(output)):
yhat = output[i,0]
yhat = invert_scale(scaler, X[i], yhat)
yhat = yhat + raw_data[i]
predictions.append(yhat)
#evaluation metric
rmse = sqrt(mean_squared_error(raw_data[1:], predictions))
return rmse
# fit an MLP network based on the parameters
def fit(train, test, raw, scaler, batch_size, nb_epoch, neurons):
X, y = train[:, 0:-1], train[:, -1]
#building the model architecture
model = Sequential()
model.add(Dense(neurons, activation='relu', input_dim=X.shape[1]))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
# fit model
train_rmse, test_rmse = list(), list()
for i in range(nb_epoch):
model.fit(X, y, epochs=1, batch_size=batch_size, verbose=0, shuffle=False)
raw_train = raw[-(len(train)+len(test)+1):-len(test)]
train_rmse.append(evaluate(model, raw_train, train, scaler, 0, batch_size))
raw_test = raw[-(len(test)+1):]
test_rmse.append(evaluate(model, raw_test, test, scaler, 0, batch_size))
history = DataFrame()
history['train'], history['test'] = train_rmse, test_rmse
return history
def fit_model(train, batch_size, nb_epoch, neurons):
X, y = train[:, 0:-1], train[:, -1]
model = Sequential()
model.add(Dense(neurons, activation='relu', input_dim=X.shape[1]))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(X, y, epochs=nb_epoch, batch_size=batch_size, verbose=1, shuffle=False)
return model
# building, training and evaluating the various MLP networks
def experiment(repeats, series, epochs, lag, neurons):
raw_values = series.values
diff_values = difference(raw_values, 1)
supervised = timeseries_to_supervised(diff_values, lag)
supervised_values = supervised.values[lag:,:]
train, test = supervised_values[0:-365], supervised_values[-365:]
scaler, train_scaled, test_scaled = scale(train, test)
error_scores = list()
for r in range(repeats):
batch_size = 16
model = fit_model(train_scaled, batch_size, epochs, neurons)
test_reshaped = test_scaled[:,0:-1]
output = model.predict(test_reshaped, batch_size=batch_size)
predictions = list()
for i in range(len(output)):
yhat = output[i,0]
X = test_scaled[i, 0:-1]
yhat = invert_scale(scaler, X, yhat)
yhat = inverse_difference(raw_values, yhat, len(test_scaled)+1-i)
predictions.append(yhat)
rmse = sqrt(mean_squared_error(raw_values[-365:], predictions))
print('%d) Test RMSE: %.3f' % (r+1, rmse))
error_scores.append(rmse)
return error_scores
series = yieldsp
#config variables for the various setups 0
repeats = 8
results = DataFrame()
epochs = 10
neurons = [1, 3, 5]
for n in neurons:
results[str(n)] = experiment(repeats, series, epochs, n, n)
print(results.describe())
results.boxplot()
pyplot.savefig('boxplot_neurons_lag.png')
The author plots the test and train RMSE for each of the 10 runs.
# run diagnostic experiments
def run():
# config
repeats = 10
n_batch = 16
n_epochs = 20
n_neurons = 2
n_lag = 3
series = yieldsp
raw_values = series.values
diff_values = difference(raw_values, 1)
supervised = timeseries_to_supervised(diff_values, n_lag)
supervised_values = supervised.values[n_lag:,:]
train, test = supervised_values[0:-365], supervised_values[-365:]
scaler, train_scaled, test_scaled = scale(train, test)
train_trimmed = train_scaled[2:, :]
for i in range(repeats):
history = fit(train_trimmed, test_scaled, raw_values, scaler, n_batch, n_epochs, n_neurons)
pyplot.plot(history['train'], color='blue')
pyplot.plot(history['test'], color='orange')
print('%d) TrainRMSE=%f, TestRMSE=%f' % (i, history['train'].iloc[-1], history['test'].iloc[-1]))
pyplot.savefig('diagnostic_neurons_lag.png')
run()
The author has constructed a multi-layered LSTM network to forecast the test set values of the yield spread, given the history of observations in the training set.
dataset = yieldsp['yieldsp'].values
dataset = dataset.astype('float32')
dataset = np.reshape(dataset, (-1, 1))
dataset.shape
As LSTMs are sensitive to the scale of the input data, particularly when we invoke the sigmoid or the hyperbolic tangent activation functions, we need to normalized to the range (0,1).
scaler = MinMaxScaler(feature_range=(0, 1))
dataset = scaler.fit_transform(dataset)
train, test = np.split(yieldsp, [int(.95 *len(yieldsp))])
train.shape
I have created a class to easily model LSTMs.
from keras.models import Sequential
from keras.layers import LSTM, Dense
class DeepModelTS():
"""
A class to create a deep time series model
"""
def __init__(
self,
data: pd.DataFrame,
Y_var: str,
lag: int,
LSTM_layer_depth: int,
epochs=10,
batch_size=256,
train_test_split=0,
patience = 5
):
self.data = data
self.Y_var = Y_var
self.lag = lag
self.LSTM_layer_depth = LSTM_layer_depth
self.batch_size = batch_size
self.epochs = epochs
self.patience = patience
self.train_test_split = train_test_split
@staticmethod
def create_X_Y(ts: list, lag: int) -> tuple:
"""
A method to create X and Y matrix from a time series list for the training of
deep learning models
"""
X, Y = [], []
if len(ts) - lag <= 0:
X.append(ts)
else:
for i in range(len(ts) - lag):
Y.append(ts[i + lag])
X.append(ts[i:(i + lag)])
X, Y = np.array(X), np.array(Y)
# Reshaping the X array to an LSTM input shape
X = np.reshape(X, (X.shape[0], X.shape[1], 1))
return X, Y
def create_data_for_NN(self,use_last_n=None):
"""
A method to create data for the LSTM model
"""
# Extracting the dependent variable we want to model/forecast
y = self.data[self.Y_var].tolist()
# Subseting the time series if needed
if use_last_n is not None:
y = y[-use_last_n:]
# The X matrix will hold the lags of Y
X, Y = self.create_X_Y(y, self.lag)
# Creating training and test sets
X_train = X
X_test = []
Y_train = Y
Y_test = []
if self.train_test_split > 0:
index = round(len(X) * self.train_test_split)
X_train = X[:(len(X) - index)]
X_test = X[-index:]
Y_train = Y[:(len(X) - index)]
Y_test = Y[-index:]
return X_train, X_test, Y_train, Y_test
def LSTModel(self):
"""
A method to fit the LSTM model
"""
# Getting the data
X_train, X_test, Y_train, Y_test = self.create_data_for_NN()
# Defining the model
model = Sequential()
model.add(LSTM(self.LSTM_layer_depth, activation='relu', input_shape=(self.lag, 1)))
model.add(Dense(1))
model.compile(optimizer='adam', loss='mse')
# Fitting the model
model.fit(X_train, Y_train,
self.batch_size,
self.epochs,
validation_data=(X_test, Y_test),
callbacks = [EarlyStopping(monitor='val_loss', patience= self.patience)],
verbose=1, shuffle=False)
# Saving the model to the class
self.model = model
return model
def predict(self) -> list:
"""
A method to predict using the test data used in creating the class
"""
yhat = []
if(self.train_test_split > 0):
# Getting the last n time series
_, X_test, _, _ = self.create_data_for_NN()
# Making the prediction list
yhat = [y[0] for y in self.model.predict(X_test)]
return yhat
def predict_n_ahead(self, n_ahead: int):
"""
A method to predict n time steps ahead
"""
X, _, _, _ = self.create_data_for_NN(use_last_n=self.lag)
# Making the prediction list
yhat = []
for _ in range(n_ahead):
# Making the prediction
fc = self.model.predict(X)
yhat.append(fc)
# Creating a new input matrix for forecasting
X = np.append(X, fc)
# Ommiting the first variable
X = np.delete(X, 0)
# Reshaping for the next iteration
X = np.reshape(X, (1, len(X), 1))
# Training the model with more lag periods increases the training time:
deep_learner = DeepModelTS(data = train, Y_var = 'yieldsp', lag = 50,
LSTM_layer_depth = 50, epochs = 1000 ,batch_size = 64,
train_test_split = 0.05, patience = 200)
model = deep_learner.LSTModel()
lag = 50
ts = train['yieldsp'].tail(lag ).values.tolist()
X, _ = deep_learner.create_X_Y(ts, lag=lag )
yhat = model.predict(X)
# to see how our model performs out of sample.
yhat = deep_learner.predict()
fc = train.tail(len(yhat)).copy()
fc['forecast'] = yhat
fc.head()
fig_lstm = fc.iplot(asFigure=True, kind='scatter', xTitle='Date', yTitle='Yield Spread')
fig_lstm.update_layout(
title={
'text': "Actual and Predicted Yield Spread from vanilla LSTM",
'y':0.9,
'x':0.5,
'xanchor': 'center',
'yanchor': 'top'})
fig_lstm.show()
error = mean_squared_error(fc['yieldsp'], fc['forecast'])
rmse = sqrt(error)
print('Train RMSE from vanilla LSTM: %.5f' % rmse)
We can gain finer control over when the internal state of the LSTM network is cleared in Keras by making the LSTM layer “stateful”. This means that it can build state over the entire training sequence and even maintain that state if needed to make predictions.
def create_dataset(dataset, look_back=1):
X, Y = [], []
for i in range(len(dataset)-look_back-1):
a = dataset[i:(i+look_back), 0]
X.append(a)
Y.append(dataset[i + look_back, 0])
return np.array(X), np.array(Y)
np.random.seed(7)
dataframe = yieldsp
dataset = dataframe.values
dataset = dataset.astype('float32')
# normalize the dataset
scaler = MinMaxScaler(feature_range=(0, 1))
dataset = scaler.fit_transform(dataset)
'''
Keras requires that In a stateful network, you should only pass inputs
with a number of samples that can be divided by the batch size.
Thus, we need to hard code the test-train split, i have done the split in a manner
that we have 10:1 ratio between train and test sizes.
'''
train, test = dataset[0:8851,:], dataset[8801:-9,:]
# reshape into X=t and Y=t+1
look_back = 50
trainX, trainY = create_dataset(train, look_back)
testX, testY = create_dataset(test, look_back)
# reshape input to be [samples, time steps, features]
trainX = np.reshape(trainX, (trainX.shape[0], trainX.shape[1], 1))
testX = np.reshape(testX, (testX.shape[0], testX.shape[1], 1))
print(len(trainX))
print(len(testX))
# create and fit the stateful LSTM network
batch_size = 110
model = Sequential()
model.add(LSTM(4, batch_input_shape=(batch_size, look_back, 1), stateful=True, return_sequences=True))
model.add(LSTM(4, batch_input_shape=(batch_size, look_back, 1), stateful=True))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(trainX, trainY, epochs=100, batch_size=batch_size, verbose=1, shuffle=False,
callbacks = [EarlyStopping(monitor='loss', patience= 20)], )
model.reset_states()
trainPredict = model.predict(trainX, batch_size=batch_size)
model.reset_states()
testPredict = model.predict(testX, batch_size=batch_size)
trainPredict = scaler.inverse_transform(trainPredict)
trainY = scaler.inverse_transform([trainY])
testPredict = scaler.inverse_transform(testPredict)
testY = scaler.inverse_transform([testY])
# calculate root mean squared error
trainScore = math.sqrt(mean_squared_error(trainY[0], trainPredict[:,0]))
print('Train Score: %.5f RMSE' % (trainScore))
testScore = math.sqrt(mean_squared_error(testY[0], testPredict[:,0]))
print('Test Score: %.5f RMSE' % (testScore))
# shift train predictions for plotting
trainPredictPlot = np.empty_like(dataset)
trainPredictPlot[:, :] = np.nan
trainPredictPlot[look_back:len(trainPredict)+look_back, :] = trainPredict
# shift test predictions for plotting
testPredictPlot = np.empty_like(dataset)
testPredictPlot[:, :] = np.nan
testPredictPlot[len(trainPredict) + look_back:9730, :] = testPredict
# combine the predicted and actual data
train_pred = pd.DataFrame(trainPredictPlot, columns = ['train_pred'])
test_pred = pd.DataFrame(testPredictPlot, columns = ['test_pred'])
pred = pd.merge(train_pred, test_pred, on = test_pred.index)
pred1 = pred[['train_pred', 'test_pred']]
merged = pd.merge(yieldsp, pred1, on = yieldsp.index).rename(columns= {'key_0': 'Date'}).set_index('Date')
fig_lstm = merged.iplot(asFigure=True, kind='scatter', xTitle='Date', yTitle='Yield Spread')
fig_lstm
def series_to_supervised(data, n_in=1, n_out=1, dropnan=True):
n_vars = 1 if type(data) is list else data.shape[1]
df = DataFrame(data)
cols, names = list(), list()
for i in range(n_in, 0, -1):
cols.append(df.shift(i))
names += [('var%d(t-%d)' % (j+1, i)) for j in range(n_vars)]
for i in range(0, n_out):
cols.append(df.shift(-i))
if i == 0:
names += [('var%d(t)' % (j+1)) for j in range(n_vars)]
else:
names += [('var%d(t+%d)' % (j+1, i)) for j in range(n_vars)]
agg = pd.concat(cols, axis=1)
agg.columns = names
if dropnan:
agg.dropna(inplace=True)
return agg
values = yield_d.values
encoder = LabelEncoder()
values[:,4] = encoder.fit_transform(values[:,4])
values = values.astype('float32')
scaler = MinMaxScaler(feature_range=(0, 1))
scaled = scaler.fit_transform(values)
reframed = series_to_supervised(scaled, 1, 1)
reframed.drop(reframed.columns[[ 10, 11, 12,13, 14, 15, 16, 17]], axis=1, inplace=True)
reframed.head()
values = reframed.values
n_train_hours = int(.95 *len(reframed))
train = values[:n_train_hours, :]
test = values[n_train_hours:, :]
train_X, train_y = train[:, :-1], train[:, -1]
test_X, test_y = test[:, :-1], test[:, -1]
train_X = train_X.reshape((train_X.shape[0], 1, train_X.shape[1]))
test_X = test_X.reshape((test_X.shape[0], 1, test_X.shape[1]))
print(train_X.shape, train_y.shape, test_X.shape, test_y.shape)
model = Sequential()
model.add(LSTM(25, input_shape=(train_X.shape[1], train_X.shape[2])))
model.add(Dense(1))
model.compile(loss='mae', optimizer='RMSprop')
# fit network
history = model.fit(train_X, train_y, epochs=500, batch_size=50, validation_data=(test_X, test_y),
# callbacks = [EarlyStopping(monitor='val_loss', patience= 100)],
verbose=1, shuffle=False)
mod_loss1 = pd.DataFrame(history.history, columns=['loss', 'val_loss'])
fig_loss1 = mod_loss1.iplot(asFigure=True, kind='scatter', xTitle='Epochs', yTitle='Loss')
fig_loss1.update_layout(
title={
'text': 'Training and Validation loss of the multivariate LSTM model ',
'y':0.9,
'x':0.5,
'xanchor': 'center',
'yanchor': 'top'})
fig_loss1.show()
# forecast for the entire dataset
yhat = model.predict(test_X)
test_X = test_X.reshape((test_X.shape[0], test_X.shape[2]))
# invert scaling for forecast
inv_yhat = concatenate((yhat, test_X[:, 1:]), axis=1)
inv_yhat = scaler.inverse_transform(inv_yhat)
inv_yhat = inv_yhat[:,0]
# invert scaling for actual
test_y = test_y.reshape((len(test_y), 1))
inv_y = concatenate((test_y, test_X[:, 1:]), axis=1)
inv_y = scaler.inverse_transform(inv_y)
inv_y = inv_y[:,0]
# calculate RMSE
rmse = sqrt(mean_squared_error(inv_y, inv_yhat))
print('Test RMSE: %.5f' % rmse)
train_data, test_data = np.split(yield_d, [int(.95 *len(yield_d))])
invert = pd.DataFrame(inv_y, columns = ['yieldsp_lstm'])
yhat_lstm = pd.DataFrame(inv_yhat, columns = ['inv_yhat_lstm'])
lstm = pd.merge(invert, yhat_lstm,
on = test_data.index).rename(columns= {'key_0': 'Date'}).set_index('Date')
fig_lstm_d = lstm.iplot(asFigure=True, kind='scatter', xTitle='Date', yTitle='Yield Spread')
fig_lstm_d.update_layout(
title={
'text': 'Actual and Forecasted Yield Spread using Multivariate LSTM model',
'y':0.9,
'x':0.5,
'xanchor': 'center',
'yanchor': 'top'})
fig_lstm_d.show()
| Model | RMSE |
|---|---|
| ARIMA(1,0,3) | 0.05110 |
| Differenced ARIMA(3,1,3) | 0.05147 |
| SARIMAX(2,1,2) | 0.4744 |
| VAR(45) | 0.50267 |
| MLP | 0.055239 |
| Stateful stacked LSTM | 0.06081 |
| Multivariate LSTM | 0.07118 |
The results presented here are from the replication-minimal notebook and differ slightly to the ones obtained in this notebook due to the randomness in training Neural nets.